package com.pk.flink.source;

import com.pk.flink.bean.Access;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

public class AccessSource implements SourceFunction<Access> {
    private boolean isRunning = true;
    @Override
    public void run(SourceContext<Access> ctx) throws Exception {
        String[] domains = new String[] {"www.baidu.com","www.google.com","www.perplexity"};
        while(isRunning) {
            ctx.collect(new Access(System.currentTimeMillis(),domains[(new Random()).nextInt(domains.length)],
                    (new Random()).nextDouble() + 1000));

            Thread.sleep(2000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}
